use assert_matches::assert_matches;
use ic_base_types::{CanisterId, NumBytes, NumSeconds, PrincipalId, SubnetId};
use ic_btc_replica_types::{
    BitcoinAdapterResponse, BitcoinAdapterResponseWrapper, BitcoinReject,
    GetSuccessorsRequestInitial, GetSuccessorsResponseComplete, Network, SendTransactionRequest,
};
use ic_error_types::RejectCode;
use ic_management_canister_types_private::{
    BitcoinGetSuccessorsResponse, CanisterChange, CanisterChangeDetails, CanisterChangeOrigin,
    Payload as _,
};
use ic_registry_routing_table::{CanisterIdRange, RoutingTable};
use ic_registry_subnet_type::SubnetType;
use ic_replicated_state::canister_state::execution_state::{
    CustomSection, CustomSectionType, WasmMetadata,
};
use ic_replicated_state::metadata_state::subnet_call_context_manager::{
    BitcoinGetSuccessorsContext, BitcoinSendTransactionInternalContext, SubnetCallContext,
};
use ic_replicated_state::replicated_state::testing::ReplicatedStateTesting;
use ic_replicated_state::replicated_state::{
    MemoryTaken, PeekableOutputIterator, ReplicatedStateMessageRouting,
};
use ic_replicated_state::testing::{
    CanisterQueuesTesting, FakeDropMessageMetrics, SystemStateTesting,
};
use ic_replicated_state::{
    CanisterState, IngressHistoryState, InputSource, ReplicatedState, SchedulerState, StateError,
    SystemState,
};
use ic_test_utilities_state::{ExecutionStateBuilder, arb_replicated_state_with_output_queues};
use ic_test_utilities_types::ids::{SUBNET_1, canister_test_id, message_test_id, user_test_id};
use ic_test_utilities_types::messages::{RequestBuilder, ResponseBuilder};
use ic_types::ingress::{IngressState, IngressStatus};
use ic_types::messages::{CallbackId, Refund, RejectContext};
use ic_types::messages::{
    CanisterMessage, MAX_RESPONSE_COUNT_BYTES, Payload, Request, RequestOrResponse, Response,
};
use ic_types::time::CoarseTime;
use ic_types::time::UNIX_EPOCH;
use ic_types::xnet::StreamIndex;
use ic_types::{CountBytes, Cycles, MemoryAllocation, Time};
use maplit::btreemap;
use proptest::prelude::*;
use std::collections::{BTreeMap, VecDeque};
use std::mem::size_of;
use std::sync::Arc;
use strum::IntoEnumIterator;

const SUBNET_ID: SubnetId = SubnetId::new(PrincipalId::new(29, [0xfc; 29]));
const CANISTER_ID: CanisterId = CanisterId::from_u64(42);
const OTHER_CANISTER_ID: CanisterId = CanisterId::from_u64(13);
const SUBNET_AVAILABLE_GUARANTEED_RESPONSE_MEMORY: i64 = i64::MAX / 2;
const SOME_DEADLINE: CoarseTime = CoarseTime::from_secs_since_unix_epoch(1);

fn request_from(canister_id: CanisterId) -> Request {
    RequestBuilder::default()
        .sender(canister_id)
        .receiver(CANISTER_ID)
        .build()
}

fn best_effort_request_from(canister_id: CanisterId) -> Request {
    RequestBuilder::default()
        .sender(canister_id)
        .receiver(CANISTER_ID)
        .deadline(SOME_DEADLINE)
        .build()
}

fn request_to(canister_id: CanisterId) -> Request {
    RequestBuilder::default()
        .sender(CANISTER_ID)
        .receiver(canister_id)
        .build()
}

fn best_effort_request_to(canister_id: CanisterId) -> Request {
    RequestBuilder::default()
        .sender(CANISTER_ID)
        .receiver(canister_id)
        .deadline(SOME_DEADLINE)
        .build()
}

fn response_from(canister_id: CanisterId) -> Response {
    ResponseBuilder::default()
        .respondent(canister_id)
        .originator(CANISTER_ID)
        .build()
}

fn response_to(canister_id: CanisterId) -> Response {
    ResponseBuilder::default()
        .respondent(CANISTER_ID)
        .originator(canister_id)
        .build()
}

fn best_effort_response_to(canister_id: CanisterId) -> Response {
    ResponseBuilder::default()
        .respondent(CANISTER_ID)
        .originator(canister_id)
        .deadline(SOME_DEADLINE)
        .build()
}

fn time_out_messages(state: &mut ReplicatedState) -> usize {
    let metrics = FakeDropMessageMetrics::default();
    state.time_out_messages(&metrics);
    metrics.timed_out_messages.borrow().values().sum()
}

/// Fixture using `SUBNET_ID` as its own subnet ID and `CANISTER_ID` as the ID
/// for the model canister used to send requests (responses) to and from.
/// Such messages are generated by the functions `request_from` et. al.
struct ReplicatedStateFixture {
    state: ReplicatedState,
}

impl ReplicatedStateFixture {
    fn new() -> ReplicatedStateFixture {
        Self::with_canisters(&[CANISTER_ID])
    }

    pub fn with_canisters(canister_ids: &[CanisterId]) -> ReplicatedStateFixture {
        Self::with_wasm_metadata(canister_ids, WasmMetadata::new(BTreeMap::new()))
    }

    pub fn with_wasm_metadata(
        canister_ids: &[CanisterId],
        wasm_metadata: WasmMetadata,
    ) -> ReplicatedStateFixture {
        let mut state = ReplicatedState::new(SUBNET_ID, SubnetType::Application);
        for canister_id in canister_ids {
            let scheduler_state = SchedulerState::default();
            let system_state = SystemState::new_running_for_testing(
                *canister_id,
                user_test_id(24).get(),
                Cycles::new(1 << 36),
                NumSeconds::from(100_000),
            );
            let execution_state = ExecutionStateBuilder::default()
                .with_wasm_metadata(wasm_metadata.clone())
                .build();
            state.put_canister_state(CanisterState::new(
                system_state,
                Some(execution_state),
                scheduler_state,
            ));
        }
        ReplicatedStateFixture { state }
    }

    fn push_input(
        &mut self,
        msg: RequestOrResponse,
    ) -> Result<bool, (StateError, RequestOrResponse)> {
        self.state.push_input(
            msg,
            &mut SUBNET_AVAILABLE_GUARANTEED_RESPONSE_MEMORY.clone(),
        )
    }

    fn pop_input(&mut self) -> Option<CanisterMessage> {
        self.state
            .canister_state_mut(&CANISTER_ID)
            .unwrap()
            .pop_input()
    }

    fn push_output_request(
        &mut self,
        request: Request,
        time: Time,
    ) -> Result<(), (StateError, Arc<Request>)> {
        self.state
            .canister_state_mut(&CANISTER_ID)
            .unwrap()
            .push_output_request(request.into(), time)
    }

    fn push_output_response(&mut self, response: Response) {
        self.state
            .canister_state_mut(&CANISTER_ID)
            .unwrap()
            .push_output_response(response.into());
    }

    fn pop_output(&mut self) -> Option<RequestOrResponse> {
        self.state
            .canister_state_mut(&CANISTER_ID)
            .unwrap()
            .output_into_iter()
            .pop()
    }

    /// Push the message to a stream.
    fn push_to_stream(&mut self, msgs: Vec<RequestOrResponse>) {
        let mut streams = self.state.take_streams();
        for msg in msgs.into_iter() {
            streams.entry(SUBNET_ID).or_default().push(msg.into());
        }
        self.state.put_streams(streams);
    }

    /// Discard the next `count` messages from the same stream as `push_to_stream` uses.
    fn discard_from_stream(&mut self, count: u64) {
        let mut streams = self.state.take_streams();
        let stream = streams.entry(SUBNET_ID).or_default();
        let old_begin = stream.messages_begin();
        stream.discard_messages_before(StreamIndex::from(old_begin.get() + count), &vec![].into());
        self.state.put_streams(streams);
    }

    fn stop_canister(&mut self) {
        let canister = self.state.canister_state_mut(&CANISTER_ID).unwrap();
        canister
            .system_state
            .begin_stopping(ic_types::messages::StopCanisterContext::Ingress {
                sender: user_test_id(24),
                message_id: [0; 32].into(),
                call_id: None,
            });
        assert!(canister.system_state.try_stop_canister(|_| false).0);
    }

    fn memory_taken(&self) -> MemoryTaken {
        self.state.memory_taken()
    }

    fn guaranteed_response_message_memory_taken(&self) -> NumBytes {
        self.state.guaranteed_response_message_memory_taken()
    }

    fn best_effort_message_memory_taken(&self) -> NumBytes {
        self.state.best_effort_message_memory_taken()
    }

    fn remote_subnet_input_schedule(&self, canister: &CanisterId) -> &VecDeque<CanisterId> {
        self.state
            .canister_state(canister)
            .unwrap()
            .system_state
            .queues()
            .remote_sender_schedule()
    }

    fn local_subnet_input_schedule(&self, canister: &CanisterId) -> &VecDeque<CanisterId> {
        self.state
            .canister_state(canister)
            .unwrap()
            .system_state
            .queues()
            .local_sender_schedule()
    }

    fn time_out_messages(&mut self) -> usize {
        time_out_messages(&mut self.state)
    }

    fn enforce_best_effort_message_limit(&mut self, limit: NumBytes) -> (usize, NumBytes) {
        let metrics = FakeDropMessageMetrics::default();
        self.state
            .enforce_best_effort_message_limit(limit, &metrics);
        let shed_messages = metrics.shed_messages.borrow().values().sum();
        let shed_message_bytes: usize = metrics.shed_message_bytes.borrow().values().sum();
        (shed_messages, (shed_message_bytes as u64).into())
    }

    fn canister_balance(&self, canister_id: &CanisterId) -> Option<Cycles> {
        self.state
            .canister_state(canister_id)?
            .system_state
            .balance()
            .into()
    }

    fn take_all_refunds(&mut self) -> Vec<Refund> {
        let mut refunds = Vec::with_capacity(self.state.refunds().len());
        self.state.take_refunds(|refund| {
            refunds.push(*refund);
            true
        });
        assert!(self.state.refunds().is_empty());
        refunds
    }
}

fn assert_execution_memory_taken(total_memory_usage: usize, fixture: &ReplicatedStateFixture) {
    assert_eq!(
        total_memory_usage as u64,
        fixture.memory_taken().execution().get()
    );
}

fn assert_message_memory_taken(
    guaranteed_response_memory_usage: usize,
    best_effort_memory_usage: usize,
    fixture: &ReplicatedStateFixture,
) {
    let guaranteed_response_memory_usage = guaranteed_response_memory_usage as u64;
    let best_effort_memory_usage = best_effort_memory_usage as u64;
    assert_eq!(
        guaranteed_response_memory_usage,
        fixture.memory_taken().guaranteed_response_messages().get()
    );
    assert_eq!(
        guaranteed_response_memory_usage,
        fixture.guaranteed_response_message_memory_taken().get()
    );
    assert_eq!(
        best_effort_memory_usage,
        fixture.memory_taken().best_effort_messages().get()
    );
    assert_eq!(
        best_effort_memory_usage,
        fixture.best_effort_message_memory_taken().get()
    );
    assert_eq!(
        guaranteed_response_memory_usage + best_effort_memory_usage,
        fixture.memory_taken().messages_total().get()
    );
}

fn assert_canister_history_memory_taken(
    canister_history_memory_usage: usize,
    fixture: &ReplicatedStateFixture,
) {
    assert_eq!(
        canister_history_memory_usage as u64,
        fixture.memory_taken().canister_history().get(),
    );
}

fn assert_wasm_custom_sections_memory_taken(
    wasm_custom_sections_memory_usage: u64,
    fixture: &ReplicatedStateFixture,
) {
    assert_eq!(
        wasm_custom_sections_memory_usage,
        fixture.memory_taken().wasm_custom_sections().get()
    );
}

fn assert_subnet_available_guaranteed_response_memory(
    initial_available_guaranteed_response_memory: i64,
    guaranteed_response_memory_usage: usize,
    actual: i64,
) {
    assert_eq!(
        initial_available_guaranteed_response_memory - guaranteed_response_memory_usage as i64,
        actual
    );
}

#[test]
fn memory_taken_by_canister_queues() {
    let mut fixture = ReplicatedStateFixture::new();
    let mut subnet_available_guaranteed_response_memory =
        SUBNET_AVAILABLE_GUARANTEED_RESPONSE_MEMORY;

    // Zero memory used initially.
    assert_execution_memory_taken(0, &fixture);
    assert_message_memory_taken(0, 0, &fixture);
    assert_canister_history_memory_taken(0, &fixture);
    assert_wasm_custom_sections_memory_taken(0, &fixture);

    // Push a request into a canister input queue.
    assert!(
        fixture
            .state
            .push_input(
                request_from(OTHER_CANISTER_ID).into(),
                &mut subnet_available_guaranteed_response_memory,
            )
            .unwrap()
    );

    // Reserved memory for one response.
    assert_execution_memory_taken(0, &fixture);
    assert_message_memory_taken(MAX_RESPONSE_COUNT_BYTES, 0, &fixture);
    assert_canister_history_memory_taken(0, &fixture);
    assert_wasm_custom_sections_memory_taken(0, &fixture);
    assert_subnet_available_guaranteed_response_memory(
        SUBNET_AVAILABLE_GUARANTEED_RESPONSE_MEMORY,
        MAX_RESPONSE_COUNT_BYTES,
        subnet_available_guaranteed_response_memory,
    );

    // Pop input request.
    assert!(fixture.pop_input().is_some());

    // Unchanged memory usage.
    assert_execution_memory_taken(0, &fixture);
    assert_message_memory_taken(MAX_RESPONSE_COUNT_BYTES, 0, &fixture);
    assert_canister_history_memory_taken(0, &fixture);
    assert_wasm_custom_sections_memory_taken(0, &fixture);

    // Push a response into the output queue.
    let response = response_to(OTHER_CANISTER_ID);
    fixture.push_output_response(response.clone());

    // Memory used by response only.
    assert_execution_memory_taken(0, &fixture);
    assert_message_memory_taken(response.count_bytes(), 0, &fixture);
    assert_canister_history_memory_taken(0, &fixture);
    assert_wasm_custom_sections_memory_taken(0, &fixture);

    // Push a best-effort request into a canister input queue.
    let request = best_effort_request_from(OTHER_CANISTER_ID);
    assert!(
        fixture
            .state
            .push_input(
                request.clone().into(),
                &mut subnet_available_guaranteed_response_memory
            )
            .unwrap()
    );

    // Best-effort memory used by the response (no reservation).
    assert_execution_memory_taken(0, &fixture);
    assert_message_memory_taken(response.count_bytes(), request.count_bytes(), &fixture);
    assert_canister_history_memory_taken(0, &fixture);
    assert_wasm_custom_sections_memory_taken(0, &fixture);
    assert_subnet_available_guaranteed_response_memory(
        SUBNET_AVAILABLE_GUARANTEED_RESPONSE_MEMORY,
        MAX_RESPONSE_COUNT_BYTES,
        subnet_available_guaranteed_response_memory,
    );

    // Pop input request.
    assert!(fixture.pop_input().is_some());

    // Zero best-effort memory usage.
    assert_execution_memory_taken(0, &fixture);
    assert_message_memory_taken(response.count_bytes(), 0, &fixture);
    assert_canister_history_memory_taken(0, &fixture);
    assert_wasm_custom_sections_memory_taken(0, &fixture);
}

#[test]
fn memory_taken_by_subnet_queues() {
    let mut fixture = ReplicatedStateFixture::new();
    let mut subnet_available_guaranteed_response_memory =
        SUBNET_AVAILABLE_GUARANTEED_RESPONSE_MEMORY;

    // Zero memory used initially.
    assert_execution_memory_taken(0, &fixture);
    assert_message_memory_taken(0, 0, &fixture);
    assert_canister_history_memory_taken(0, &fixture);
    assert_wasm_custom_sections_memory_taken(0, &fixture);

    // Push a guaranteed resoibse request into the subnet input queues.
    assert!(
        fixture
            .state
            .push_input(
                request_to(SUBNET_ID.into()).into(),
                &mut subnet_available_guaranteed_response_memory,
            )
            .unwrap()
    );

    // Reserved memory for one response.
    assert_execution_memory_taken(0, &fixture);
    assert_message_memory_taken(MAX_RESPONSE_COUNT_BYTES, 0, &fixture);
    assert_canister_history_memory_taken(0, &fixture);
    assert_wasm_custom_sections_memory_taken(0, &fixture);
    assert_subnet_available_guaranteed_response_memory(
        SUBNET_AVAILABLE_GUARANTEED_RESPONSE_MEMORY,
        MAX_RESPONSE_COUNT_BYTES,
        subnet_available_guaranteed_response_memory,
    );

    // Pop subnet input request.
    assert!(fixture.state.pop_subnet_input().is_some());

    // Unchanged memory usage.
    assert_execution_memory_taken(0, &fixture);
    assert_message_memory_taken(MAX_RESPONSE_COUNT_BYTES, 0, &fixture);
    assert_canister_history_memory_taken(0, &fixture);
    assert_wasm_custom_sections_memory_taken(0, &fixture);

    // Push a response into the subnet output queues.
    let response = response_from(SUBNET_ID.into());
    fixture
        .state
        .push_subnet_output_response(response.clone().into());

    // Memory used by response only.
    assert_execution_memory_taken(0, &fixture);
    assert_message_memory_taken(response.count_bytes(), 0, &fixture);
    assert_canister_history_memory_taken(0, &fixture);
    assert_wasm_custom_sections_memory_taken(0, &fixture);

    // Push a best-effort request into the subnet input queues.
    let request = best_effort_request_to(SUBNET_ID.into());
    assert!(
        fixture
            .state
            .push_input(
                request.clone().into(),
                &mut subnet_available_guaranteed_response_memory
            )
            .unwrap()
    );

    // Best-effort memory used by the response (no reservation).
    assert_execution_memory_taken(0, &fixture);
    assert_message_memory_taken(response.count_bytes(), request.count_bytes(), &fixture);
    assert_canister_history_memory_taken(0, &fixture);
    assert_wasm_custom_sections_memory_taken(0, &fixture);
    assert_subnet_available_guaranteed_response_memory(
        SUBNET_AVAILABLE_GUARANTEED_RESPONSE_MEMORY,
        MAX_RESPONSE_COUNT_BYTES,
        subnet_available_guaranteed_response_memory,
    );

    // Pop subnet input request.
    assert!(fixture.state.pop_subnet_input().is_some());

    // Zero best-effort memory usage.
    assert_execution_memory_taken(0, &fixture);
    assert_message_memory_taken(response.count_bytes(), 0, &fixture);
    assert_canister_history_memory_taken(0, &fixture);
    assert_wasm_custom_sections_memory_taken(0, &fixture);
}

#[test]
fn memory_taken_by_wasm_custom_sections() {
    let mut custom_sections: BTreeMap<String, CustomSection> = BTreeMap::new();
    custom_sections.insert(
        String::from("candid"),
        CustomSection::new(CustomSectionType::Private, vec![0; 10 * 1024]),
    );
    let wasm_metadata = WasmMetadata::new(custom_sections);
    let wasm_metadata_memory = wasm_metadata.memory_usage();

    let mut fixture = ReplicatedStateFixture::with_wasm_metadata(&[CANISTER_ID], wasm_metadata);
    let mut subnet_available_guaranteed_response_memory =
        SUBNET_AVAILABLE_GUARANTEED_RESPONSE_MEMORY;

    // Only memory for wasm custom sections is used initially.
    assert_execution_memory_taken(wasm_metadata_memory.get() as usize, &fixture);
    assert_wasm_custom_sections_memory_taken(wasm_metadata_memory.get(), &fixture);

    // Push a request into a canister input queue.
    assert!(
        fixture
            .state
            .push_input(
                request_from(OTHER_CANISTER_ID).into(),
                &mut subnet_available_guaranteed_response_memory,
            )
            .unwrap()
    );

    // Reserved memory for one response.
    assert_execution_memory_taken(wasm_metadata_memory.get() as usize, &fixture);
    assert_message_memory_taken(MAX_RESPONSE_COUNT_BYTES, 0, &fixture);
    assert_canister_history_memory_taken(0, &fixture);
    assert_wasm_custom_sections_memory_taken(wasm_metadata_memory.get(), &fixture);
    assert_subnet_available_guaranteed_response_memory(
        SUBNET_AVAILABLE_GUARANTEED_RESPONSE_MEMORY,
        MAX_RESPONSE_COUNT_BYTES,
        subnet_available_guaranteed_response_memory,
    );
}

#[test]
fn memory_taken_by_canister_history() {
    let mut fixture = ReplicatedStateFixture::with_wasm_metadata(
        &[CANISTER_ID],
        WasmMetadata::new(BTreeMap::new()),
    );

    // No memory is used initially.
    assert_execution_memory_taken(0, &fixture);
    assert_message_memory_taken(0, 0, &fixture);
    assert_canister_history_memory_taken(0, &fixture);
    assert_wasm_custom_sections_memory_taken(0, &fixture);

    // Memory for two canister changes.
    let canister_history_memory: usize =
        2 * (size_of::<CanisterChange>() + 2 * size_of::<PrincipalId>());

    // Push two canister changes into canister history.
    let canister_state = fixture.state.canister_state_mut(&CANISTER_ID).unwrap();
    canister_state.system_state.add_canister_change(
        Time::from_nanos_since_unix_epoch(0),
        CanisterChangeOrigin::from_user(user_test_id(42).get()),
        CanisterChangeDetails::canister_creation(
            vec![canister_test_id(777).get(), user_test_id(42).get()],
            None,
        ),
    );
    canister_state.system_state.add_canister_change(
        Time::from_nanos_since_unix_epoch(16),
        CanisterChangeOrigin::from_user(user_test_id(123).get()),
        CanisterChangeDetails::controllers_change(vec![
            canister_test_id(0).get(),
            canister_test_id(1).get(),
        ]),
    );
    assert_execution_memory_taken(canister_history_memory, &fixture);
    assert_canister_history_memory_taken(canister_history_memory, &fixture);

    // Test small fixed memory allocation.
    let canister_state = fixture.state.canister_state_mut(&CANISTER_ID).unwrap();
    canister_state.system_state.memory_allocation = MemoryAllocation::from(NumBytes::from(2));
    assert_execution_memory_taken(canister_history_memory, &fixture);
    assert_canister_history_memory_taken(canister_history_memory, &fixture);

    // Test large fixed memory allocation.
    let canister_state = fixture.state.canister_state_mut(&CANISTER_ID).unwrap();
    canister_state.system_state.memory_allocation = MemoryAllocation::from(NumBytes::from(888));
    assert_execution_memory_taken(888, &fixture);
    assert_canister_history_memory_taken(canister_history_memory, &fixture);

    // Reset canister memory allocation.
    let canister_state = fixture.state.canister_state_mut(&CANISTER_ID).unwrap();
    canister_state.system_state.memory_allocation = MemoryAllocation::default();

    // Test a system subnet.
    fixture.state.metadata.own_subnet_type = SubnetType::System;

    assert_execution_memory_taken(canister_history_memory, &fixture);
    assert_canister_history_memory_taken(canister_history_memory, &fixture);
}

#[test]
fn push_subnet_queues_input_respects_subnet_available_guaranteed_response_memory() {
    let mut fixture = ReplicatedStateFixture::new();
    let initial_available_guaranteed_response_memory = MAX_RESPONSE_COUNT_BYTES as i64;
    let mut subnet_available_guaranteed_response_memory =
        initial_available_guaranteed_response_memory;

    // Zero memory used initially.
    assert_execution_memory_taken(0, &fixture);
    assert_message_memory_taken(0, 0, &fixture);
    assert_canister_history_memory_taken(0, &fixture);
    assert_wasm_custom_sections_memory_taken(0, &fixture);

    // Push a guarnteed response request into the subnet input queues.
    assert!(
        fixture
            .state
            .push_input(
                request_to(SUBNET_ID.into()).into(),
                &mut subnet_available_guaranteed_response_memory,
            )
            .unwrap()
    );

    // Reserved memory for one response.
    assert_execution_memory_taken(0, &fixture);
    assert_message_memory_taken(MAX_RESPONSE_COUNT_BYTES, 0, &fixture);
    assert_canister_history_memory_taken(0, &fixture);
    assert_wasm_custom_sections_memory_taken(0, &fixture);
    assert_subnet_available_guaranteed_response_memory(
        initial_available_guaranteed_response_memory,
        MAX_RESPONSE_COUNT_BYTES,
        subnet_available_guaranteed_response_memory,
    );

    // Push a second guaranteed response request into the subnet input queues.
    let res = fixture.state.push_input(
        request_to(SUBNET_ID.into()).into(),
        &mut subnet_available_guaranteed_response_memory,
    );

    // No more memory for a second guaranteed response request.
    assert_eq!(
        Err((
            StateError::OutOfMemory {
                requested: (MAX_RESPONSE_COUNT_BYTES as u64).into(),
                available: 0.into()
            },
            request_to(SUBNET_ID.into()).into(),
        )),
        res
    );

    // Unchanged memory usage.
    assert_execution_memory_taken(0, &fixture);
    assert_message_memory_taken(MAX_RESPONSE_COUNT_BYTES, 0, &fixture);
    assert_canister_history_memory_taken(0, &fixture);
    assert_wasm_custom_sections_memory_taken(0, &fixture);
    assert_eq!(0, subnet_available_guaranteed_response_memory);

    // Push a best-effort request into the subnet input queues.
    let request = best_effort_request_to(SUBNET_ID.into());
    assert!(
        fixture
            .state
            .push_input(
                request.clone().into(),
                &mut subnet_available_guaranteed_response_memory
            )
            .unwrap()
    );

    // Best-effort memory consumed by the request; otherwise, memory usage unchanged.
    assert_execution_memory_taken(0, &fixture);
    assert_message_memory_taken(MAX_RESPONSE_COUNT_BYTES, request.count_bytes(), &fixture);
    assert_canister_history_memory_taken(0, &fixture);
    assert_wasm_custom_sections_memory_taken(0, &fixture);
    assert_eq!(0, subnet_available_guaranteed_response_memory);
}

#[test]
fn push_input_queues_respects_local_remote_subnet() {
    let mut fixture = ReplicatedStateFixture::new();

    // Assert the queues are empty.
    assert!(fixture.pop_input().is_none());
    assert!(fixture.state.canister_state(&OTHER_CANISTER_ID).is_none());

    // Push message from the remote canister, should be in the remote subnet
    // queue.
    assert!(
        fixture
            .push_input(request_from(OTHER_CANISTER_ID).into())
            .unwrap()
    );
    assert_eq!(fixture.remote_subnet_input_schedule(&CANISTER_ID).len(), 1);

    // Push message from the local canister, should be in the local subnet queue.
    assert!(
        fixture
            .push_input(request_from(CANISTER_ID).into())
            .unwrap()
    );
    assert_eq!(fixture.local_subnet_input_schedule(&CANISTER_ID).len(), 1);

    // Push message from the local subnet, should be in the local subnet queue.
    assert!(
        fixture
            .push_input(request_from(CanisterId::unchecked_from_principal(SUBNET_ID.get())).into())
            .unwrap()
    );
    assert_eq!(fixture.local_subnet_input_schedule(&CANISTER_ID).len(), 2);
}

#[test]
fn subnet_queue_push_input_response() {
    let mut state = ReplicatedState::new(SUBNET_ID, SubnetType::Application);

    let response = response_to(SUBNET_ID.into());
    assert_eq!(
        state.push_input(
            response.clone().into(),
            &mut SUBNET_AVAILABLE_GUARANTEED_RESPONSE_MEMORY.clone()
        ),
        Err((
            StateError::non_matching_response(
                "Management canister does not accept canister responses",
                &response
            ),
            response.into()
        ))
    );
}

#[test]
fn insert_bitcoin_response_non_matching() {
    let mut state = ReplicatedState::new(SUBNET_ID, SubnetType::Application);

    assert_eq!(
        state.push_response_bitcoin(BitcoinAdapterResponse {
            response: BitcoinAdapterResponseWrapper::GetSuccessorsResponse(
                GetSuccessorsResponseComplete {
                    blocks: vec![],
                    next: vec![],
                },
            ),
            callback_id: 0,
        }),
        Err(StateError::BitcoinNonMatchingResponse { callback_id: 0 })
    );
}

#[test]
fn insert_bitcoin_response() {
    let mut state = ReplicatedState::new(SUBNET_ID, SubnetType::Application);

    state.metadata.subnet_call_context_manager.push_context(
        SubnetCallContext::BitcoinGetSuccessors(BitcoinGetSuccessorsContext {
            request: RequestBuilder::default().build(),
            payload: GetSuccessorsRequestInitial {
                network: Network::BitcoinRegtest,
                anchor: vec![],
                processed_block_hashes: vec![],
            },
            time: UNIX_EPOCH,
        }),
    );

    let response = GetSuccessorsResponseComplete {
        blocks: vec![],
        next: vec![],
    };

    state
        .push_response_bitcoin(BitcoinAdapterResponse {
            response: BitcoinAdapterResponseWrapper::GetSuccessorsResponse(response.clone()),
            callback_id: 0,
        })
        .unwrap();

    assert_eq!(
        state.consensus_queue[0].payload,
        Payload::Data(BitcoinGetSuccessorsResponse::Complete(response).encode())
    );
}

#[test]
fn insert_bitcoin_get_successor_reject_response() {
    let mut state = ReplicatedState::new(SUBNET_ID, SubnetType::Application);

    state.metadata.subnet_call_context_manager.push_context(
        SubnetCallContext::BitcoinGetSuccessors(BitcoinGetSuccessorsContext {
            request: RequestBuilder::default().build(),
            payload: GetSuccessorsRequestInitial {
                network: Network::BitcoinRegtest,
                anchor: vec![],
                processed_block_hashes: vec![],
            },
            time: UNIX_EPOCH,
        }),
    );

    let error_message = "Request failed with error.".to_string();
    let response = BitcoinReject {
        message: error_message.clone(),
        reject_code: RejectCode::SysTransient,
    };

    state
        .push_response_bitcoin(BitcoinAdapterResponse {
            response: BitcoinAdapterResponseWrapper::GetSuccessorsReject(response.clone()),
            callback_id: 0,
        })
        .unwrap();
    assert_eq!(
        state.consensus_queue[0].payload,
        Payload::Reject(RejectContext::new(RejectCode::SysTransient, error_message))
    );
}

#[test]
fn insert_bitcoin_send_transaction_reject_response() {
    let mut state = ReplicatedState::new(SUBNET_ID, SubnetType::Application);

    state.metadata.subnet_call_context_manager.push_context(
        SubnetCallContext::BitcoinSendTransactionInternal(BitcoinSendTransactionInternalContext {
            request: RequestBuilder::default().build(),
            payload: SendTransactionRequest {
                network: Network::BitcoinRegtest,
                transaction: vec![],
            },
            time: UNIX_EPOCH,
        }),
    );

    let error_message = "Request failed with error.".to_string();
    let response = BitcoinReject {
        message: error_message.clone(),
        reject_code: RejectCode::SysTransient,
    };

    state
        .push_response_bitcoin(BitcoinAdapterResponse {
            response: BitcoinAdapterResponseWrapper::SendTransactionReject(response.clone()),
            callback_id: 0,
        })
        .unwrap();
    assert_eq!(
        state.consensus_queue[0].payload,
        Payload::Reject(RejectContext::new(RejectCode::SysTransient, error_message))
    );
}

#[test]
fn time_out_messages_updates_subnet_input_schedules_correctly() {
    let mut fixture = ReplicatedStateFixture::with_canisters(&[CANISTER_ID, OTHER_CANISTER_ID]);

    // Enqueue 3 outgoing requests from `CANISTER_ID`:
    // - one to self.
    // - one to a another local canister.
    // - one to a remote canister.
    let remote_canister_id = CanisterId::from_u64(123);
    for (i, receiver) in [CANISTER_ID, OTHER_CANISTER_ID, remote_canister_id]
        .iter()
        .enumerate()
    {
        let mut request = Request {
            payment: Cycles::new(13),
            ..best_effort_request_to(*receiver)
        };
        request.sender_reply_callback = CallbackId::from(i as u64);
        fixture.push_output_request(request, UNIX_EPOCH).unwrap();
    }

    // Time out everything, then check that subnet input schedules are as expected.
    fixture.state.metadata.batch_time = Time::from_nanos_since_unix_epoch(u64::MAX);
    assert_eq!(3, fixture.time_out_messages());
    assert!(fixture.state.refunds().is_empty());

    assert_eq!(2, fixture.local_subnet_input_schedule(&CANISTER_ID).len());
    for canister_id in [CANISTER_ID, OTHER_CANISTER_ID] {
        assert!(
            fixture
                .local_subnet_input_schedule(&CANISTER_ID)
                .contains(&canister_id)
        );
    }
    assert_eq!(
        fixture.remote_subnet_input_schedule(&CANISTER_ID),
        &VecDeque::from(vec![remote_canister_id])
    );
}

#[test]
fn time_out_messages_generates_refunds() {
    let mut fixture = ReplicatedStateFixture::with_canisters(&[CANISTER_ID, OTHER_CANISTER_ID]);
    let remote_canister_id = CanisterId::from_u64(123);

    // Enqueue an inbound best-effort request from `remote_canister_id`.
    let request = Request {
        payment: Cycles::new(13),
        ..best_effort_request_from(remote_canister_id)
    };
    assert_eq!(Ok(true), fixture.push_input(request.into()));

    // Time it out.
    fixture.state.metadata.batch_time = Time::from_nanos_since_unix_epoch(u64::MAX);
    assert_eq!(1, fixture.time_out_messages());

    // Check that a refund to `remote_canister_id` was generated.
    assert_eq!(
        [Refund::anonymous(remote_canister_id, Cycles::new(13))],
        *fixture.take_all_refunds()
    );
}

#[test]
fn time_out_messages_in_subnet_queues() {
    let mut fixture = ReplicatedStateFixture::new();

    // Enqueue 2 incoming best-effort requests for `SUBNET_ID`.
    for i in 0..2 {
        let request = Request {
            deadline: CoarseTime::from_secs_since_unix_epoch(1000 + i as u32),
            payment: Cycles::new(13),
            ..best_effort_request_to(SUBNET_ID.into())
        };
        assert!(fixture.push_input(request.into()).unwrap());
    }

    // Time out the first request.
    let second_request_deadline = CoarseTime::from_secs_since_unix_epoch(1001);
    fixture.state.metadata.batch_time = second_request_deadline.into();
    assert_eq!(1, fixture.time_out_messages());
    assert_eq!(
        [Refund::anonymous(CANISTER_ID, Cycles::new(13))],
        *fixture.take_all_refunds()
    );

    // Second request should still be in the queue.
    assert_matches!(
        fixture.state.pop_subnet_input(),
        Some(CanisterMessage::Request(request)) if request.deadline == second_request_deadline
    );
    assert_eq!(None, fixture.state.pop_subnet_input());
}

#[test]
fn enforce_best_effort_message_limit() {
    let mut fixture = ReplicatedStateFixture::with_canisters(&[CANISTER_ID, OTHER_CANISTER_ID]);

    // Enqueue 4 best-effort incoming requests of increasing sizes for
    // `CANISTER_ID`, `OTHER_CANISTER_ID` and `own_subnet_id` (i.e. subnet queues).
    let own_subnet_id = CanisterId::from(fixture.state.metadata.own_subnet_id);
    let mut message_sizes = Vec::new();
    for (i, receiver) in [CANISTER_ID, OTHER_CANISTER_ID, CANISTER_ID, own_subnet_id]
        .iter()
        .enumerate()
    {
        let request = Request {
            method_name: String::from_utf8(vec![b'x'; i * 10 + 1]).unwrap(),
            payment: Cycles::new(1 << i),
            ..best_effort_request_to(*receiver)
        };
        message_sizes.push(NumBytes::from(request.count_bytes() as u64));
        assert!(fixture.push_input(request.into()).unwrap());
    }

    assert_eq!(
        (0, 0.into()),
        fixture.enforce_best_effort_message_limit(u64::MAX.into()),
    );
    assert!(fixture.state.refunds().is_empty());

    let best_effort_memory_usage = fixture.state.best_effort_message_memory_taken();
    assert_eq!(
        (0, 0.into()),
        fixture.enforce_best_effort_message_limit(best_effort_memory_usage),
    );
    assert!(fixture.state.refunds().is_empty());

    // Enforce a limit equal to the mean message size. This should shed everything
    // but the first message we enqueued.
    let mean_message_size = best_effort_memory_usage / 4;
    assert_eq!(
        (3, message_sizes[1] + message_sizes[2] + message_sizes[3]),
        fixture.enforce_best_effort_message_limit(mean_message_size),
    );
    let expected_refunds = [Refund::anonymous(
        CANISTER_ID,
        Cycles::new((1 << 1) + (1 << 2) + (1 << 3)),
    )];
    assert_eq!(expected_refunds, *fixture.take_all_refunds());

    // A second identical call should be a no-op.
    assert_eq!(
        (0, 0.into()),
        fixture.enforce_best_effort_message_limit(mean_message_size),
    );
    assert!(fixture.state.refunds().is_empty());

    // Pop the remaining message.
    assert!(fixture.pop_input().is_some());

    // There should now be no more inbound or outbound messages left.
    assert!(fixture.pop_input().is_none());
    assert!(fixture.state.output_into_iter().next().is_none());
}

#[test]
fn push_best_effort_response_for_non_existent_canister_succeeds() {
    // A replicated state with no canisters installed.
    let mut fixture = ReplicatedStateFixture::with_canisters(&[]);

    // Pushing a guaranteed response fails.
    let response = response_to(CANISTER_ID);
    assert!(fixture.push_input(response.into()).is_err());

    // Pushing a best-effort response succeeds (i.e. dropped silently).
    let mut best_effort_response = response_to(CANISTER_ID);
    best_effort_response.deadline = SOME_DEADLINE;
    assert_eq!(Ok(false), fixture.push_input(best_effort_response.into()));
}

#[test]
fn split() {
    // We will be splitting subnet A into A' and B.
    const SUBNET_A: SubnetId = SUBNET_ID;
    const SUBNET_B: SubnetId = SUBNET_1;

    const CANISTER_1: CanisterId = CANISTER_ID;
    const CANISTER_2: CanisterId = OTHER_CANISTER_ID;
    const CANISTERS: [CanisterId; 2] = [CANISTER_1, CANISTER_2];

    // Retain `CANISTER_1` on `SUBNET_A`, migrate `CANISTER_2` to `SUBNET_B`.
    let routing_table = RoutingTable::try_from(btreemap! {
        CanisterIdRange {start: CANISTER_1, end: CANISTER_1} => SUBNET_A,
        CanisterIdRange {start: CANISTER_2, end: CANISTER_2} => SUBNET_B,
    })
    .unwrap();

    // Fixture with 2 canisters.
    let mut fixture = ReplicatedStateFixture::with_canisters(&CANISTERS);

    // Stream with a couple of requests. The details don't matter, should be
    // retained unmodified on subnet A' only.
    fixture.push_to_stream(vec![
        request_to(CANISTER_1).into(),
        request_to(CANISTER_2).into(),
    ]);

    // Makes an `IngressHistoryState` with one `Received` message addressed to each
    // of `canisters`.
    let make_ingress_history = |canisters: &[CanisterId]| {
        let mut ingress_history = IngressHistoryState::default();
        for (i, canister) in CANISTERS.iter().enumerate() {
            if canisters.contains(canister) {
                ingress_history.insert(
                    message_test_id(i as u64),
                    IngressStatus::Known {
                        receiver: canister.get(),
                        user_id: user_test_id(i as u64),
                        time: UNIX_EPOCH,
                        state: IngressState::Received,
                    },
                    UNIX_EPOCH,
                    NumBytes::from(u64::MAX),
                    |_| {},
                );
            }
        }
        ingress_history
    };
    // Ingress history: 2 `Received` messages, addressed to canisters 1 and 2.
    // Should be retained on both sides after phase 1, split after phase 2.
    fixture.state.metadata.ingress_history = make_ingress_history(&CANISTERS);

    // Subnet queues. Should be preserved on subnet A' only.
    assert!(
        fixture
            .push_input(
                RequestBuilder::default()
                    .sender(CANISTER_1)
                    .receiver(SUBNET_A.into())
                    .build()
                    .into(),
            )
            .unwrap()
    );

    // Set up input schedules. Add a couple of input messages to each canister.
    for sender in CANISTERS {
        for receiver in CANISTERS {
            assert!(
                fixture
                    .push_input(
                        RequestBuilder::default()
                            .sender(sender)
                            .receiver(receiver)
                            .build()
                            .into(),
                    )
                    .unwrap()
            );
        }
    }
    for canister in CANISTERS {
        assert_eq!(2, fixture.local_subnet_input_schedule(&canister).len());
        assert_eq!(0, fixture.remote_subnet_input_schedule(&canister).len());
    }

    // Add some refunds. Should be retained on subnet A' only.
    fixture.state.add_refund(CANISTER_1, Cycles::new(100));
    fixture.state.add_refund(CANISTER_2, Cycles::new(200));

    //
    // Split off subnet A', phase 1.
    //
    let mut state_a = fixture
        .state
        .clone()
        .split(SUBNET_A, &routing_table, None)
        .unwrap();

    // Start off with the original state.
    let mut expected = fixture.state.clone();
    // Only `CANISTER_1` should be left.
    expected.canister_states.remove(&CANISTER_2);
    // And the split marker should be set.
    expected.metadata.split_from = Some(SUBNET_A);
    // Otherwise, the state should be the same.
    assert_eq!(expected, state_a);

    //
    // Subnet A', phase 2.
    //
    state_a.after_split();

    // Ingress history should only contain the message to `CANISTER_1`.
    expected.metadata.ingress_history = make_ingress_history(&[CANISTER_1]);
    // The input schedules of `CANISTER_1` should have been repartitioned.
    let mut canister_state = expected.canister_states.remove(&CANISTER_1).unwrap();
    canister_state
        .system_state
        .split_input_schedules(&CANISTER_1, &expected.canister_states);
    expected.canister_states.insert(CANISTER_1, canister_state);
    // And the split marker should be reset.
    expected.metadata.split_from = None;
    // Everything else should be the same as in phase 1.
    assert_eq!(expected, state_a);

    //
    // Split off subnet B, phase 1.
    //
    let mut state_b = fixture
        .state
        .clone()
        .split(SUBNET_B, &routing_table, None)
        .unwrap();

    // Subnet B state is based off of an empty state.
    let mut expected = ReplicatedState::new(SUBNET_B, fixture.state.metadata.own_subnet_type);
    // Only `CANISTER_2` should be left.
    expected.canister_states.insert(
        CANISTER_2,
        fixture.state.canister_state(&CANISTER_2).unwrap().clone(),
    );
    // The full ingress history should be preserved.
    expected.metadata.ingress_history = fixture.state.metadata.ingress_history;
    // And the split marker should be set.
    expected.metadata.split_from = Some(SUBNET_A);
    // Otherwise, the state should be the same.
    assert_eq!(expected, state_b);

    //
    // Subnet B, phase 2.
    //
    state_b.after_split();

    // Ingress history should only contain the message to `CANISTER_2`.
    expected.metadata.ingress_history = make_ingress_history(&[CANISTER_2]);
    // The input schedules of `CANISTER_2` should have been repartitioned.
    let mut canister_state = expected.canister_states.remove(&CANISTER_2).unwrap();
    canister_state
        .system_state
        .split_input_schedules(&CANISTER_2, &expected.canister_states);
    expected.canister_states.insert(CANISTER_2, canister_state);
    // And the split marker should be reset.
    expected.metadata.split_from = None;
    // Everything else should be the same as in phase 1.
    assert_eq!(expected, state_b);
}

#[test]
fn input_source_roundtrip() {
    use ic_protobuf::state::queues::v1::canister_queues as pb;

    for initial in InputSource::iter() {
        let encoded = pb::NextInputQueue::from(&initial);
        let round_trip = InputSource::from(encoded);

        assert_eq!(initial, round_trip);
    }
}

#[test]
fn compatibility_for_input_source() {
    // If this fails, you are making a potentially incompatible change to `NextInputQueue`.
    // See note [Handling changes to Enums in Replicated State] for how to proceed.
    assert_eq!(
        InputSource::iter().map(|x| x as i32).collect::<Vec<i32>>(),
        [0, 1, 2]
    );
}

#[test]
fn ready_for_migration() {
    let mut fixture = ReplicatedStateFixture::with_canisters(&[CANISTER_ID, OTHER_CANISTER_ID]);

    // Canister is running, not ready for migration.
    assert!(!fixture.state.ready_for_migration(&CANISTER_ID));

    fixture
        .push_input(best_effort_request_from(OTHER_CANISTER_ID).into())
        .unwrap();
    fixture.stop_canister();

    // Input queue is not empty, not ready for migration.
    assert!(!fixture.state.ready_for_migration(&CANISTER_ID));

    fixture.pop_input().unwrap();
    fixture.push_output_response(best_effort_response_to(OTHER_CANISTER_ID));

    // Output queue is not empty, not ready for migration.
    assert!(!fixture.state.ready_for_migration(&CANISTER_ID));

    // Empty output queue.
    let response = fixture.pop_output().unwrap();
    assert!(fixture.state.ready_for_migration(&CANISTER_ID));

    // Put best-effort response into the stream, still ready for migration.
    fixture.push_to_stream(vec![response]);
    assert!(fixture.state.ready_for_migration(&CANISTER_ID));

    // Put a guaranteed response from a different canister into the stream, still ready for migration.
    fixture.push_to_stream(vec![response_from(OTHER_CANISTER_ID).into()]);
    assert!(fixture.state.ready_for_migration(&CANISTER_ID));

    // Put a guaranteed response from the canister into the stream, not ready for migration.
    fixture.push_to_stream(vec![response_to(OTHER_CANISTER_ID).into()]);
    assert!(!fixture.state.ready_for_migration(&CANISTER_ID));

    // Remove some messages from the stream, but not the guaranteed response, still not ready for migration.
    fixture.discard_from_stream(2);
    assert!(!fixture.state.ready_for_migration(&CANISTER_ID));

    // Remove the guaranteed response from the stream, ready for migration.
    fixture.discard_from_stream(1);
    assert!(fixture.state.ready_for_migration(&CANISTER_ID));
}

#[test]
fn credit_refund() {
    let mut fixture = ReplicatedStateFixture::with_canisters(&[CANISTER_ID]);
    let initial_balance = fixture.canister_balance(&CANISTER_ID).unwrap();

    // Refund 10 cycles to `CANISTER_ID`.
    let credited = fixture
        .state
        .credit_refund(&Refund::anonymous(CANISTER_ID, Cycles::new(10)));
    assert!(credited);
    assert_eq!(
        Some(initial_balance + Cycles::new(10)),
        fixture.canister_balance(&CANISTER_ID)
    );

    // Refunding to a non-existent canister is a no-op.
    let credited = fixture
        .state
        .credit_refund(&Refund::anonymous(OTHER_CANISTER_ID, Cycles::new(11)));
    assert!(!credited);
    assert_eq!(
        Some(initial_balance + Cycles::new(10)),
        fixture.canister_balance(&CANISTER_ID)
    );
    assert_eq!(None, fixture.canister_balance(&OTHER_CANISTER_ID));
}

#[test_strategy::proptest]
fn peek_and_next_consistent(
    #[strategy(arb_replicated_state_with_output_queues(SUBNET_ID, 10, 10, Some(5)))]
    state_and_queues: (
        ReplicatedState,
        VecDeque<VecDeque<RequestOrResponse>>,
        usize,
    ),
) {
    let (mut replicated_state, _, total_requests) = state_and_queues;

    let mut output_iter = replicated_state.output_into_iter();

    let mut num_requests = 0;
    while let Some(msg) = output_iter.peek() {
        num_requests += 1;
        prop_assert_eq!(Some(msg.clone()), output_iter.next());
    }

    drop(output_iter);
    prop_assert_eq!(total_requests, num_requests);
    prop_assert_eq!(replicated_state.output_message_count(), 0);
}

/// Replicated state with multiple canisters, each with multiple output queues
/// of size 1. Some messages are consumed, some (size 1) queues are excluded.
///
/// Expect consumed + excluded to equal initial size. Expect the messages in
/// excluded queues to be left in the state.
#[test_strategy::proptest]
fn peek_and_next_consistent_with_exclude_queue(
    #[strategy(arb_replicated_state_with_output_queues(SUBNET_ID, 10, 10, None))] state_and_queues: (
        ReplicatedState,
        VecDeque<VecDeque<RequestOrResponse>>,
        usize,
    ),
    #[strategy(0..=1)] start: i32,
    #[strategy(2..=5)] exclude_step: i32,
) {
    let (mut replicated_state, _, total_requests) = state_and_queues;

    let mut output_iter = replicated_state.output_into_iter();

    let mut i = start;
    let mut excluded = 0;
    let mut consumed = 0;
    while let Some(msg) = output_iter.peek() {
        i += 1;
        if i % exclude_step == 0 {
            output_iter.exclude_queue();
            excluded += 1;
        } else {
            prop_assert_eq!(Some(msg.clone()), output_iter.next());
            consumed += 1;
        }
    }

    drop(output_iter);
    prop_assert_eq!(total_requests, excluded + consumed);
    prop_assert_eq!(replicated_state.output_message_count(), excluded);
}

#[test_strategy::proptest]
fn iter_yields_correct_elements(
    #[strategy(arb_replicated_state_with_output_queues(SUBNET_ID, 10, 10, None))] state_and_queues: (
        ReplicatedState,
        VecDeque<VecDeque<RequestOrResponse>>,
        usize,
    ),
) {
    let (mut replicated_state, mut raw_requests, _total_requests) = state_and_queues;

    let mut output_iter = replicated_state.output_into_iter();

    for msg in &mut output_iter {
        let mut requests = raw_requests.pop_front().unwrap();
        while requests.is_empty() {
            requests = raw_requests.pop_front().unwrap();
        }

        if let Some(raw_msg) = requests.pop_front() {
            prop_assert_eq!(
                &msg,
                &raw_msg,
                "Popped message does not correspond with expected message. popped: {:?}. expected: {:?}.",
                msg,
                raw_msg
            );
        } else {
            prop_assert!(
                false,
                "Pop yielded an element that was not contained in the respective queue"
            );
        }

        raw_requests.push_back(requests);
    }

    drop(output_iter);
    // Ensure that actually all elements have been consumed.
    prop_assert_eq!(
        raw_requests
            .iter()
            .map(|requests| requests.len())
            .sum::<usize>(),
        0
    );
    prop_assert_eq!(replicated_state.output_message_count(), 0);
}

#[test_strategy::proptest]
fn iter_with_exclude_queue_yields_correct_elements(
    #[strategy(arb_replicated_state_with_output_queues(SUBNET_ID, 10, 10, None))] state_and_queues: (
        ReplicatedState,
        VecDeque<VecDeque<RequestOrResponse>>,
        usize,
    ),
    #[strategy(0..=1)] start: i32,
    #[strategy(2..=5)] ignore_step: i32,
) {
    let (mut replicated_state, mut raw_requests, total_requests) = state_and_queues;

    let mut consumed = 0;
    let mut ignored_requests = Vec::new();
    // Check whether popping elements with ignores in between yields the expected messages
    {
        let mut output_iter = replicated_state.output_into_iter();

        let mut i = start;
        while let Some(msg) = output_iter.peek() {
            let mut requests = raw_requests.pop_front().unwrap();
            while requests.is_empty() {
                requests = raw_requests.pop_front().unwrap();
            }

            i += 1;
            if i % ignore_step == 0 {
                // Popping the front of the requests will amount to the same as ignoring as
                // we use queues of size one in this test.
                let popped = requests.pop_front().unwrap();
                prop_assert_eq!(msg, &popped);
                output_iter.exclude_queue();
                ignored_requests.push(popped);
                // We push the queue to the front as the canister gets another chance if one
                // of its queues are ignored in the current implementation.
                raw_requests.push_front(requests);
                continue;
            }

            let msg = output_iter.next().unwrap();
            if let Some(raw_msg) = requests.pop_front() {
                consumed += 1;
                prop_assert_eq!(
                    &msg,
                    &raw_msg,
                    "Popped message does not correspond with expected message. popped: {:?}. expected: {:?}.",
                    msg,
                    raw_msg
                );
            } else {
                prop_assert!(
                    false,
                    "Pop yielded an element that was not contained in the respective queue"
                );
            }

            raw_requests.push_back(requests);
        }
    }

    let remaining_output = replicated_state.output_message_count();

    prop_assert_eq!(remaining_output, total_requests - consumed);
    prop_assert_eq!(remaining_output, ignored_requests.len());

    for raw in ignored_requests {
        let queues = if let Some(canister) = replicated_state.canister_states.get_mut(&raw.sender())
        {
            canister.system_state.queues_mut()
        } else {
            replicated_state.subnet_queues_mut()
        };

        let msg = queues.pop_canister_output(&raw.receiver()).unwrap();
        prop_assert_eq!(raw, msg);
    }

    prop_assert_eq!(replicated_state.output_message_count(), 0);
}

#[test_strategy::proptest]
fn ignore_leaves_state_untouched(
    #[strategy(arb_replicated_state_with_output_queues(SUBNET_ID, 10, 10, Some(5)))]
    state_and_queues: (
        ReplicatedState,
        VecDeque<VecDeque<RequestOrResponse>>,
        usize,
    ),
) {
    let (mut replicated_state, _, _) = state_and_queues;

    let expected_state = replicated_state.clone();
    {
        let mut output_iter = replicated_state.output_into_iter();

        while output_iter.peek().is_some() {
            output_iter.exclude_queue();
        }
    }

    prop_assert_eq!(expected_state, replicated_state);
}

#[test_strategy::proptest]
fn peek_next_loop_with_exclude_queue_terminates(
    #[strategy(arb_replicated_state_with_output_queues(SUBNET_ID, 10, 10, Some(5)))]
    state_and_queues: (
        ReplicatedState,
        VecDeque<VecDeque<RequestOrResponse>>,
        usize,
    ),
    #[strategy(0..=1)] start: i32,
    #[strategy(2..=5)] ignore_step: i32,
) {
    let (mut replicated_state, _, _) = state_and_queues;

    let mut output_iter = replicated_state.output_into_iter();

    let mut i = start;
    while output_iter.peek().is_some() {
        i += 1;
        if i % ignore_step == 0 {
            output_iter.exclude_queue();
            continue;
        }
        output_iter.next();
    }
}

#[test_strategy::proptest]
fn iter_with_stale_entries_terminates(
    #[strategy(arb_replicated_state_with_output_queues(SUBNET_ID, 10, 10, Some(5)))]
    state_and_queues: (
        ReplicatedState,
        VecDeque<VecDeque<RequestOrResponse>>,
        usize,
    ),
    #[strategy(any::<u32>())] batch_time_seconds: u32,
) {
    let (mut replicated_state, _, total_requests) = state_and_queues;

    const NANOS_PER_SEC: u64 = 1_000_000_000;
    replicated_state.metadata.batch_time =
        Time::from_nanos_since_unix_epoch(batch_time_seconds as u64 * NANOS_PER_SEC);
    let timed_out_messages = time_out_messages(&mut replicated_state);

    // Just consume all output messages.
    //
    // We cannot check the exact ordering because timing out some messages messes it
    // up, both across canisters and across a cainster's output queues.
    let output_messages = replicated_state.output_into_iter().count();

    // All messages have either been timed out or output.
    prop_assert_eq!(total_requests, timed_out_messages + output_messages);
    prop_assert_eq!(replicated_state.output_message_count(), 0);
}

#[test_strategy::proptest]
fn peek_next_loop_with_stale_entries_terminates(
    #[strategy(arb_replicated_state_with_output_queues(SUBNET_ID, 10, 10, Some(5)))]
    state_and_queues: (
        ReplicatedState,
        VecDeque<VecDeque<RequestOrResponse>>,
        usize,
    ),
    #[strategy(any::<u32>())] batch_time_seconds: u32,
) {
    let (mut replicated_state, _, total_requests) = state_and_queues;

    const NANOS_PER_SEC: u64 = 1_000_000_000;
    replicated_state.metadata.batch_time =
        Time::from_nanos_since_unix_epoch(batch_time_seconds as u64 * NANOS_PER_SEC);
    let timed_out_messages = time_out_messages(&mut replicated_state);

    let mut output_iter = replicated_state.output_into_iter();

    let mut output_messages = 0;
    while let Some(msg) = output_iter.peek() {
        output_messages += 1;
        prop_assert_eq!(Some(msg.clone()), output_iter.next());
    }
    drop(output_iter);

    // All messages have either been timed out or output.
    prop_assert_eq!(total_requests, timed_out_messages + output_messages);
    prop_assert_eq!(replicated_state.output_message_count(), 0);
}
